1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.util;
17
18 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19
20 import rx.Subscription;
21 import rx.functions.Func1;
22
23
24
25
26
27
28 public final class SubscriptionIndexedRingBuffer<T extends Subscription> implements Subscription {
29
30 private volatile IndexedRingBuffer<T> subscriptions = IndexedRingBuffer.getInstance();
31 private volatile int unsubscribed = 0;
32 @SuppressWarnings("rawtypes")
33 private final static AtomicIntegerFieldUpdater<SubscriptionIndexedRingBuffer> UNSUBSCRIBED = AtomicIntegerFieldUpdater.newUpdater(SubscriptionIndexedRingBuffer.class, "unsubscribed");
34
35 public SubscriptionIndexedRingBuffer() {
36 }
37
38 @Override
39 public boolean isUnsubscribed() {
40 return unsubscribed == 1;
41 }
42
43
44
45
46
47
48
49
50
51
52
53 public synchronized int add(final T s) {
54
55 if (unsubscribed == 1 || subscriptions == null) {
56 s.unsubscribe();
57 return -1;
58 } else {
59 int n = subscriptions.add(s);
60
61 if (unsubscribed == 1) {
62 s.unsubscribe();
63 }
64 return n;
65 }
66 }
67
68
69
70
71
72
73 public void remove(final int n) {
74 if (unsubscribed == 1 || subscriptions == null || n < 0) {
75 return;
76 }
77 Subscription t = subscriptions.remove(n);
78 if (t != null) {
79
80 if (t != null) {
81 t.unsubscribe();
82 }
83 }
84 }
85
86
87
88
89
90
91 public void removeSilently(final int n) {
92 if (unsubscribed == 1 || subscriptions == null || n < 0) {
93 return;
94 }
95 subscriptions.remove(n);
96 }
97
98 @Override
99 public void unsubscribe() {
100 if (UNSUBSCRIBED.compareAndSet(this, 0, 1) && subscriptions != null) {
101
102 unsubscribeFromAll(subscriptions);
103
104 IndexedRingBuffer<T> s = subscriptions;
105 subscriptions = null;
106 s.unsubscribe();
107 }
108 }
109
110 public int forEach(Func1<T, Boolean> action) {
111 return forEach(action, 0);
112 }
113
114
115
116
117
118
119 public synchronized int forEach(Func1<T, Boolean> action, int startIndex) {
120
121 if (unsubscribed == 1 || subscriptions == null) {
122 return 0;
123 }
124 return subscriptions.forEach(action, startIndex);
125 }
126
127 private static void unsubscribeFromAll(IndexedRingBuffer<? extends Subscription> subscriptions) {
128 if (subscriptions == null) {
129 return;
130 }
131
132
133 subscriptions.forEach(UNSUBSCRIBE);
134 }
135
136 private final static Func1<Subscription, Boolean> UNSUBSCRIBE = new Func1<Subscription, Boolean>() {
137
138 @Override
139 public Boolean call(Subscription s) {
140 s.unsubscribe();
141 return Boolean.TRUE;
142 }
143 };
144
145 }